package Kafka.Producer;

import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;

import java.util.Map;
/**
 * 1. 实现接口Partitioner
 * 2. 实现3个方法:partition,close,configure
 * 3. 编写partition方法,返回分区号
 */

public class MyPartitioner implements Partitioner {
    /*
          返回信息对应的分区
          @param topic         主题
         * @param key           消息的key
         * @param keyBytes      消息的key序列化后的字节数组
         * @param value         消息的value
         * @param valueBytes    消息的value序列化后的字节数组
         * @param cluster       集群元数据可以查看分区信息
         * @return
         */
    @Override
    public int partition(String s, Object o, byte[] bytes, Object o1, byte[] bytes1, Cluster cluster) {
        // 创建partition
        int partition;
        // 获取消息
        String msgValue = o1.toString();
        // 判断消息是否包含 ansentai
        if (msgValue.contains("ansentai")){
            partition = 0;
        }else {
            partition = 1;
        }
        // 返回分区号
        return partition;
    }

    // 关闭资源
    @Override
    public void close() {

    }

    // 配置方法
    @Override
    public void configure(Map<String, ?> map) {

    }
}
